Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

KAFKA-17642: PreVote response handling and ProspectiveState #18240

Open
wants to merge 23 commits into
base: trunk
Choose a base branch
from

Conversation

ahuang98
Copy link
Contributor

KIP: https://cwiki.apache.org/confluence/display/KAFKA/KIP-996%3A+Pre-Vote
Jira: https://issues.apache.org/jira/browse/KAFKA-16164
Implements items 2-4 which cover response handling and new ProspectiveState:

Committer Checklist (excluded from commit message)

  • Verify design and implementation
  • Verify test coverage and CI build status
  • Verify documentation (including upgrade notes)

@github-actions github-actions bot added triage PRs from the community core Kafka Broker kraft clients labels Dec 18, 2024
Copy link
Member

@jsancio jsancio left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Thank @ahuang98 . Here is a partial review.

@github-actions github-actions bot removed the triage PRs from the community label Dec 19, 2024
@ahuang98 ahuang98 marked this pull request as ready for review December 23, 2024 16:05
Copy link
Member

@jsancio jsancio left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Thanks for the changes @ahuang98 . Quick review of the changes to **/src/main.

Comment on lines 3017 to 3023
// if (quorum.isVoter()) {
// canElectNewLeaderAfterOldLeaderPartitioned fails if we do not bump epoch since it is possible
// that the replica ends up as follower in the same epoch.
// resigned(leaderId=local) -> prospective(leaderId=local) -> follower(leaderId=local) which is illegal
// transitionToProspective(quorum.epoch() + 1, currentTimeMs);
// transitionToCandidate(currentTimeMs);
// } else {
Copy link
Contributor Author

@ahuang98 ahuang98 Dec 30, 2024

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

the existing raft event simulation tests picked up on a new bug in pollResigned - if we simply replace the transitionToCandidate(currentTimeMs) with transitionToProspective(currentTimeMs), a cordoned leader in epoch 5 could resign in epoch 5, transition to prospective in epoch 5 (with leaderId=localId), fail election and then attempt to become follower of itself in epoch 5.

so far, these are the alternatives which seem reasonable to me:

  • resigned voter in epoch X should transition to prospective in epoch X+1
    • cons: need to create a special code path just for this case to allow becoming prospective in epoch+1 (would also add trivial complexity for determining if votedKey or leaderId should be kept from prior transition). transitioning to prospective in epoch + 1 is almost as disruptive as transitioning directly to candidate since it involves an epoch bump
    • pro: probably the option which follows intentions of past logic most closely
  • resigned voter in epoch X should simply transition to unattached in epoch X+1 (current version)
    • con: resigned replica has to wait two election timeouts after resignation to become prospective
    • pro: simplified logic. unless this is the only replica eligible for leadership in the quorum (e.g. due to network partitioning), the impact of waiting two election timeouts after resignation is small - all other replicas should be starting their own elections within a single fetch timeout/election timeout
  • resigned voter in epoch X instead waits a smaller backoffTimeMs before transitioning to unattached in epoch X+1
    • con: scope creep - what should this backoff be? additional changes to resignedState
    • pro: resigned voter waits less time before becoming eligible to start a new election.

Copy link
Member

@jsancio jsancio Dec 30, 2024

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

the existing raft event simulation tests picked up on a new bug in pollResigned

What is the exact error? Let's add an unittest to one of the KafkaRaftClient*Test suite that shows the bug.

attempt to become follower of itself in epoch 5.

Let's add a check to transtitionToFollower that checks that leaderId is not equal to localId.

It makes sense to me that after the resign state the replica should always increase its epoch. The replica resigned from leadership at epoch X so eventually the epoch will be at least X + 1. Did you consider transitioning to candidate and relaxing the transition functions to allow both resigned and prospective to transition to candidate?

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

discussed offline, transitionToUnattached has existing logic for assigning election timeouts which we can borrow - we can just add an additional if clause that if we came from resignedState, assign electionTimeout to resignedState.electionTimeout which is effectively 0

Copy link
Member

@jsancio jsancio left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

@ahuang98 review to reply to your comments.

Comment on lines 3017 to 3023
// if (quorum.isVoter()) {
// canElectNewLeaderAfterOldLeaderPartitioned fails if we do not bump epoch since it is possible
// that the replica ends up as follower in the same epoch.
// resigned(leaderId=local) -> prospective(leaderId=local) -> follower(leaderId=local) which is illegal
// transitionToProspective(quorum.epoch() + 1, currentTimeMs);
// transitionToCandidate(currentTimeMs);
// } else {
Copy link
Member

@jsancio jsancio Dec 30, 2024

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

the existing raft event simulation tests picked up on a new bug in pollResigned

What is the exact error? Let's add an unittest to one of the KafkaRaftClient*Test suite that shows the bug.

attempt to become follower of itself in epoch 5.

Let's add a check to transtitionToFollower that checks that leaderId is not equal to localId.

It makes sense to me that after the resign state the replica should always increase its epoch. The replica resigned from leadership at epoch X so eventually the epoch will be at least X + 1. Did you consider transitioning to candidate and relaxing the transition functions to allow both resigned and prospective to transition to candidate?

@ahuang98
Copy link
Contributor Author

It makes sense to me that after the resign state the replica should always increase its epoch. The replica resigned from leadership at epoch X so eventually the epoch will be at least X + 1. Did you consider transitioning to candidate and relaxing the transition functions to allow both resigned and prospective to transition to candidate?

yes, I decided not to list that as an option because I felt it was equal to if not worse than the option of having resigned transition to prospective in epoch X + 1. personally I felt it was nicer to have less edge cases to the invariant that only prospective should transition to candidate

Copy link
Member

@jsancio jsancio left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Reviewed the changes to src/main.

prospective.epochElection().rejectingVoters()
);
prospectiveTransitionAfterElectionLoss(prospective, currentTimeMs);
}
}
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Let's add an else case and throw an illegal state exception.

* On election loss, if replica is prospective it will transition to unattached or follower state.
* If replica is candidate, it will start backing off.
*/
private void maybeHandleElectionLoss(long currentTimeMs) {
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

How about passing the NomineeState object, checking the subtype of that object and casting to the appropriate subtype.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Like the following?

    private void maybeHandleElectionLoss(NomineeState state, long currentTimeMs) {
        if (state instanceof CandidateState) {
            CandidateState candidate = (CandidateState) state;
        ...
        else if (state instanceof ProspectiveState) {
            ProspectiveState prospective = (ProspectiveState) state;
        ...

Is the intention of the additional parameter to make it clear this method should be called on NomineeState? This seems a bit redundant with the existing QuorumState helpers (e.g. isCandidate() and candidateStateOrThrow()).

Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Yes. We discussed this offline.

if (prospective.epochElection().isVoteRejected()) {
logger.info(
"Insufficient remaining votes to become candidate (rejected by {}). ",
prospective.epochElection().rejectingVoters()
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

How about just printing the entire epochElection? It may be useful to know the state of the entire voter set not just the rejecting voters.

logger.info(
"Insufficient remaining votes to become leader (rejected by {}). " +
"We will backoff before retrying election again",
candidate.epochElection().rejectingVoters()
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

How about just printing the entire epochElection? It may be useful to know the state of the entire voter set not just the rejecting voters.

raft/src/main/java/org/apache/kafka/raft/QuorumState.java Outdated Show resolved Hide resolved
@Override
public String toString() {
return String.format(
"VoterState(%s, state=%s)",
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

                "VoterState(replicaKey=%s, state=%s)",

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

ReplicaKey's toString method contains the class name so I didn't want to be redundant - String.format("ReplicaKey(id=%d, directoryId=%s)", id, directoryId);

Comment on lines 172 to 178
return String.format(
"EpochElection(%s)",
voterStates.values().stream()
.map(VoterState::toString)
.collect(
Collectors.joining(", "))
);
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Why not just print the map?

        return String.format(
            "EpochElection(voterStates=%s)",
            voterStates
        );

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

it felt redundant to print the keys given that the replica ids are also contained in the values. since this is would only be used for debugging though, I'll take your suggestion and just print the entire map

Copy link
Member

@jsancio jsancio left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Quick feedback.

@@ -667,7 +668,8 @@ private boolean maybeTransitionToLeader(CandidateState state, long currentTimeMs
}

private boolean maybeTransitionToCandidate(ProspectiveState state, long currentTimeMs) {
if (state.epochElection().isVoteGranted()) {
// If replica is the only voter, it should transition to candidate immediately
if (state.epochElection().isVoteGranted() || quorum.isOnlyVoter()) {
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

If the quorum has a size of one and since the replica votes for itself when transitioning to prospective, isVoteGranted() should always return true. If so, the replica doesn't need to check if it is the only voter.

Let's confirm we have a test for this in KafkaRaftClientTest. If not, let's add a test.

Let's also confirm that we have a test for this in ProspectiveStateTest and CandidateStateTest. If not, let's add tests for these cases.

Copy link
Contributor Author

@ahuang98 ahuang98 Jan 2, 2025

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Added four tests for this, starting at testInitializeAsOnlyVoterWithEmptyElectionState

confirm that we have a test for this in ProspectiveStateTest and CandidateStateTest

Confirmed!

transitionToCandidate(currentTimeMs);
// When there is only a single voter, become prospective immediately.
// transitionToProspective will handle short-circuiting voter to candidate state
if (quorum.isOnlyVoter() && !quorum.isNomineeState() && !quorum.isLeader()) {
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Why do you check that is not leader? In KRaft a replica should never start as a leader. KRaft throws and illegal state exception if it starts as leader. See line 545 above.

        if (quorum.isLeader()) {
            throw new IllegalStateException("Voter cannot initialize as a Leader");

Copy link
Contributor Author

@ahuang98 ahuang98 Jan 2, 2025

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Discussed offline, technically the replica can transition to leader due to the above conditional.
We can improve this conditional by directly checking if the replica is Unattached or Follower, and merge this conditional into the above conditional

@@ -154,6 +155,163 @@ private ReplicaKey replicaKey(int id, boolean withDirectoryId) {
return ReplicaKey.of(id, directoryId);
}

@ParameterizedTest
Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I've organized QuorumStateTest in the following way - misc tests were pulled to the front. All other tests are organized under banners (e.g. Initialization tests, Tests of transitions from state X)

@@ -302,7 +303,7 @@ public void testGrantVotesWhenShuttingDown(boolean withKip853Rpc) throws Excepti

@ParameterizedTest
@ValueSource(booleans = { true, false })
public void testInitializeAsResignedAndBecomeCandidate(boolean withKip853Rpc) throws Exception {
Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

The diff is misleading here. This test was just removed because I found it was a duplicate of testInitializeAsResignedLeaderFromStateStore

@@ -162,16 +218,18 @@ public void shouldRecordVoterQuorumState(KRaftVersion kraftVersion) {
getMetric(metrics, "current-vote-directory-id").metricValue()
);
assertEquals((double) 1, getMetric(metrics, "current-epoch").metricValue());
assertEquals((double) -1L, getMetric(metrics, "high-watermark").metricValue());
assertEquals((double) -1L, getMetric(metrics, "high-watermark").metricValue()); // todo, bug fix
Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

@jsancio the HW drops to -1L after candidate transitions to leader - if you agree this is a bug I'll file a Jira for this

Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment
Projects
None yet
Development

Successfully merging this pull request may close these issues.

2 participants